package com.jourwon.spring.boot.service.impl;

import com.jourwon.spring.boot.listener.CustomKafkaListenerRegistrar;
import com.jourwon.spring.boot.prop.CustomKafkaListenerProperty;
import com.jourwon.spring.boot.model.KafkaConsumerTopicPartitionInfo;
import com.jourwon.spring.boot.model.KafkaConsumerInfo;
import com.jourwon.spring.boot.service.KafkaConsumerRegistryService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * kafka消费者接口实现类
 *
 * @author JourWon
 * @date 2022/3/22
 */
@Slf4j
@Service
public class KafkaConsumerRegistryServiceImpl implements KafkaConsumerRegistryService {

    @Resource
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Resource
    private CustomKafkaListenerRegistrar customKafkaListenerRegistrar;

    @Override
    public List<KafkaConsumerInfo> listConsumerId() {
        return kafkaListenerEndpointRegistry.getListenerContainerIds()
                .stream()
                .map(this::listKafkaConsumerResponse)
                .collect(Collectors.toList());
    }

    @Override
    public String registeConsumer(String topic, String listenerClass, boolean startImmediately) {
        return customKafkaListenerRegistrar.registerCustomKafkaListener(null,
                CustomKafkaListenerProperty.builder()
                        .topic(topic)
                        .listenerClass(listenerClass)
                        .build(),
                startImmediately);
    }

    @Override
    public void startConsumer(String consumerId) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
        if (Objects.isNull(listenerContainer)) {
            throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
        } else if (listenerContainer.isRunning()) {
            throw new RuntimeException(String.format("Consumer with id %s is already running", consumerId));
        } else {
            log.info("Running a consumer with id " + consumerId);
            listenerContainer.start();
        }
    }

    @Override
    public void pauseConsumer(String consumerId) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
        if (Objects.isNull(listenerContainer)) {
            throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
        } else if (!listenerContainer.isRunning()) {
            throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
        } else if (listenerContainer.isContainerPaused()) {
            throw new RuntimeException(String.format("Consumer with id %s is already paused", consumerId));
        } else if (listenerContainer.isPauseRequested()) {
            throw new RuntimeException(String.format("Consumer with id %s is already requested to be paused", consumerId));
        } else {
            log.info("Pausing a consumer with id " + consumerId);
            listenerContainer.pause();
        }
    }

    @Override
    public void resumeConsumer(String consumerId) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
        if (Objects.isNull(listenerContainer)) {
            throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
        } else if (!listenerContainer.isRunning()) {
            throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
        } else if (!listenerContainer.isContainerPaused()) {
            throw new RuntimeException(String.format("Consumer with id %s is not paused", consumerId));
        } else {
            log.info("Resuming a consumer with id " + consumerId);
            listenerContainer.resume();
        }
    }

    @Override
    public void stopConsumer(String consumerId) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
        if (Objects.isNull(listenerContainer)) {
            throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
        } else if (!listenerContainer.isRunning()) {
            throw new RuntimeException(String.format("Consumer with id %s is already stop", consumerId));
        } else {
            log.info("Stopping a consumer with id " + consumerId);
            listenerContainer.stop();
        }
    }

    private KafkaConsumerInfo listKafkaConsumerResponse(String consumerId) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
        return KafkaConsumerInfo.builder()
                .consumerId(consumerId)
                .groupId(listenerContainer.getGroupId())
                .listenerId(listenerContainer.getListenerId())
                .running(listenerContainer.isRunning())
                .topicPartitions(Optional.ofNullable(listenerContainer.getAssignedPartitions())
                        .map(topicPartitions -> topicPartitions.stream()
                                .map(this::listKafkaConsumerAssignmentResponse)
                                .collect(Collectors.toList()))
                        .orElse(null))
                .build();
    }

    private KafkaConsumerTopicPartitionInfo listKafkaConsumerAssignmentResponse(TopicPartition topicPartition) {
        return KafkaConsumerTopicPartitionInfo.builder()
                .topic(topicPartition.topic())
                .partition(topicPartition.partition())
                .build();
    }

}
